This notebook can be executed in a notebook hosted in KubeFlow. You can find instructions on how to deploy a KubeFlow cluster and how to access the the KubeFlow UI and the hosted notebooks here: https://www.kubeflow.org/docs/pipelines/pipelines-quickstart/

Please install KubeFlow Pipelines SDK using the following comand:


In [ ]:
!pip3 install 'https://storage.googleapis.com/ml-pipeline/release/0.1.9/kfp.tar.gz'

Energy Price Forecasting Pipeline

This notebook generates a KubeFlow pipeline that runs the solution end to end. For more information on KubeFlow pipelines and how to run them in GCP please visit https://github.com/kubeflow/pipelines


In [ ]:
import kfp
from kfp import compiler
import kfp.dsl as dsl
import kfp.gcp as gcp
import kfp.notebook

#Please modify the following values to match your GCP bucket, project, and docker image name.
OUTPUT_DIR = 'gs://pipelinestest/out'
PROJECT_NAME = 'energy-forecasting'
EF_IMAGE='gcr.io/%s/energy:dev' % PROJECT_NAME

Create base image

This image takes the tensorflow/tensorflow:1.10.0-py3 as a starting point and installs python libraries and applications that are required by some components in the pipeline.


In [ ]:
%%docker {EF_IMAGE} {OUTPUT_DIR}
FROM tensorflow/tensorflow:1.10.0-py3
RUN apt-get update 
RUN apt-get install -y git
RUN pip3 install --upgrade google-api-python-client
RUN pip3 install --upgrade pyarrow
RUN pip3 install --upgrade google-cloud-bigquery
RUN pip3 install --upgrade google-cloud-storage
RUN pip3 install --upgrade gitpython

Create Components

Each cell defines the logic of different components that will be used in the pipeline and produces a .yaml file for it.


In [ ]:
def copy_table(
    dataset: str) -> str:
    """Retrieves raw data from competition website.
    Retrieves raw data from the competition site and saves it in BigQuery.
    Args:
        dataset: String specifying the dataset in BigQuery to save the data in.
    Returns:
        String specifying if the component finished succesfully.
    """
    from google.cloud import bigquery
    import requests
    import pandas as pd
    from io import StringIO
    from io import BytesIO
    import zipfile
    
    bq_client = bigquery.Client()
    price_data = pd.read_csv(
        StringIO(requests.get(
            'http://complatt.smartwatt.net/assets/files/historicalRealData/RealMarketPriceDataPT.csv').text),
        sep=';'
    )
    price_data.columns = ['date_utc', 'price']
    bq_client.load_table_from_dataframe(
        price_data,
        bq_client.dataset(dataset).table(
            'MarketPricePT')).result()
    
    weather_zip = zipfile.ZipFile(
        BytesIO(requests.get(
            'http://complatt.smartwatt.net/assets/files/weatherHistoricalData/WeatherHistoricalData.zip').content))
    weather_data = pd.read_csv(
        weather_zip.open(
            'WeatherHistoricalData/historical_weather.csv'))
    bq_client.load_table_from_dataframe(
        weather_data,
        bq_client.dataset(dataset).table(
            'historical_weather')).result()
    
    return('success')
    
    
compiler.build_python_component(
    component_func = copy_table,
    staging_gcs_path = OUTPUT_DIR,
    base_image=EF_IMAGE,
    target_component_file='copy-table.component.yaml',
    target_image = 'gcr.io/' + PROJECT_NAME + '/component-copy-table:latest')

In [ ]:
def export_table(
    inp: str,
    table: str,
    file: str) -> str:
    """Exports table to csv.
    Exports BigQuery table into CSV file.
    Args:
        inp: String containing the output from previous component.
        table: String specifying the origin BigQuery table.
        file: String specifying the path and name for the csv file.
    Returns:
        String specifying if the component finished succesfully.
    """
    from google.cloud import bigquery
    bq_client = bigquery.Client()
    bq_client.extract_table(
        table,
        file).result()
    return('success')
    
compiler.build_python_component(
    component_func = export_table,
    staging_gcs_path = OUTPUT_DIR,
    base_image=EF_IMAGE,
    target_component_file='export-table.component.yaml',
    target_image = 'gcr.io/' + PROJECT_NAME + '/component-export-table:latest')

In [ ]:
def run_git_python_script(
    inp: str,
    code_repo: str,
    code_folder: str,
    script: str,
    script_args: str) -> str:
    """Runs Python script from git repository.

    Args:
        inp: String containing the output from previous component.
        code_repo: String specifying the url to the git repository.
        code_folder: String specifying the folder for the script.
        script: String specifying the name of the script.
        script_args: String specifying the arguments for the script.
    Returns:
        String specifying if the component finished succesfully.
    """
    import os
    import git
    git.Git('').clone(code_repo)
    os.chdir(code_folder)
    output = os.system(' '.join([
        'python -m',
        script,
        script_args]))
    if output == 0:
        return('success')
    raise Exception('Script failed. The exit status was: {}'.format(output))
    
compiler.build_python_component(
    component_func = run_git_python_script,
    staging_gcs_path = OUTPUT_DIR,
    base_image=EF_IMAGE,
    target_component_file='run-git-python-script.component.yaml',
    target_image = 'gcr.io/' + PROJECT_NAME + '/component-run-git-python-script:latest')

In [ ]:
def train_git_cmle_model(
    tr_inp: str,
    va_inp: str,
    code_repo: str,
    code_folder: str,
    project: str,
    bucket: str,
    package_folder: str,
    cmle_folder: str,
    scale_tier: str,
    python_module: str,
    region: str,
    runtime_version: str,
    cmle_args: str) -> str:
    """Executes CMLE training job.
    Retrieves python file from git repo and launches training job in CMLE.
    Args:
        tr_inp: String containing the source for the training data.
        va_inp: String containing the source for the validation data.
        code_repo: String specifying the url to the git repository.
        code_folder: String specifying the folder for the job code.
        project: String specifying the GCP project where job will run.
        bucket: String specifying the GCS bucket where to save the job's outputs.
        package_folder: String specifying the python package to run for the job.
        cmle_folder: String specifying the folder in GCS where to save outputs.
        scale_tier: String specifying compute resources to use for training job.
        python_module: String specifying the python module to run for the job.
        region: String specifying the GCP region in which to run the job.
        runtime_version: String specifying the CMLE version to use for the job.
        script_args: String specifying the arguments for the CMLE job.
    Returns:
        String containing output from running the training job in CMLE.
    """
    import os
    import git
    import tarfile
    import datetime
    from google.cloud import storage
    from googleapiclient import discovery
    jobId = 'train' + datetime.datetime.today().strftime('%Y%m%d%H%M%S')
    git.Git('').clone(code_repo)
    with tarfile.open('code.tar.gz', 'w:gz') as tar:
        tar.add(
            code_folder,
            arcname=os.path.basename(code_folder))
    gcs_client = storage.Client()
    gcs_bucket = gcs_client.get_bucket(bucket)
    blob = gcs_bucket.blob(package_folder + jobId + '.tar.gz')
    blob.upload_from_filename('code.tar.gz')
    training_inputs = {
        'scaleTier': scale_tier,
        'pythonModule': python_module,
        'args': cmle_args.split(' '),
        'region': region,
        'packageUris': [
            'gs://'+ bucket + '/' + package_folder + jobId + '.tar.gz'],
        'jobDir': 'gs://'+ bucket + '/' + cmle_folder + jobId,
        'runtimeVersion': runtime_version}
    job_spec = {
        'jobId': jobId,
        'trainingInput': training_inputs}
    cloudml = discovery.build('ml', 'v1')
    project_id = 'projects/{}'.format(project)
    request = cloudml.projects().jobs().create(
        body=job_spec,
        parent=project_id)
    return(str(request.execute()))

    
compiler.build_python_component(
    component_func = train_git_cmle_model,
    staging_gcs_path = OUTPUT_DIR,
    base_image=EF_IMAGE,
    target_component_file='train-git-cmle-model.component.yaml',
    target_image = 'gcr.io/' + PROJECT_NAME + '/component-train-git-cmle-model:latest')

Create pipeline

The following code loads all components needed for the pipeline. Specifies dependencies between components. Defines arguments and defaults for the pipeline and saves the pipeline into a .tar.gz file that can be loaded into KubeFlow pipelines.


In [4]:
@dsl.pipeline(
    name='Energy Price Forecasting',
    description='Energy Price Forecasting')
def basic_bq_pipeline(
    project = dsl.PipelineParam(
        'project',
        value='energy-forecasting'),
    dataset = dsl.PipelineParam(
        'dataset',
        value='Energy'),
    bucket = dsl.PipelineParam(
        'bucket',
        value='energyforecast'),
    code_repo = dsl.PipelineParam(
        'code-repo',
        value='https://github.com/GoogleCloudPlatform/professional-services.git'),
    code_folder = dsl.PipelineParam(
        'code-folder',
        value='professional-services/examples/cloudml-energy-price-forecasting'),
    data_prep_script = dsl.PipelineParam(
        'data-prep-script',
        value='data_preparation.data_prep'),
    data_prep_args = dsl.PipelineParam(
        'data-prep-args',
        value=' '.join([
            '--dataset=Energy',
            '--train_table=MLDataTrain',
            '--valid_table=MLDataValid',
            '--test_table=MLDataTest',
            '--prepare_data_file=data_preparation/prepare_data.sql',
            '--weather_mean_std_file=data_preparation/weather_mean_std.sql',
            '--train_from_date="2015-01-05 00:00:00"',
            '--train_to_date="2015-10-04 23:01:00"',
            '--valid_from_date="2015-10-05 00:00:00"',
            '--valid_to_date="2015-10-11 23:01:00"',
            '--test_from_date="2015-10-12 00:00:00"',
            '--test_to_date="2015-10-18 23:01:00"',
            '--price_scaling=0.01',
            '--mean_path=gs://energyforecast/data/pickle/mean.pkl',
            '--std_path=gs://energyforecast/data/pickle/std.pkl'])),
    package_folder = dsl.PipelineParam(
        'package-folder',
        value='package/'),
    cmle_folder = dsl.PipelineParam(
        'cmle-folder',
        value='cmle/'),
    cmle_args = dsl.PipelineParam(
        'cmle-args',
        value=' '.join([
            '--training_path', 'gs://energyforecast/data/csv/MLDataTrain.csv',
            '--validation_path', 'gs://energyforecast/data/csv/MLDataValid.csv',
            '--mean_path', 'gs://energyforecast/data/pickle/mean.pkl',
            '--std_path', 'gs://energyforecast/data/pickle/std.pkl',
            '--dropout' , '0.2',
            '--hour_embedding', '20',
            '--day_embedding', '10',
            '--first_layer_size', '100',
            '--number_layers', '3',
            '--layer_reduction_fraction', '0.5',
            '--learning_rate', '0.01',
            '--batch_size', '64',
            '--eval_batch_size', '168',
            '--max_steps', '5000'])),
    scale_tier = dsl.PipelineParam(
        'scale-tier',
        value='BASIC'),
    python_module = dsl.PipelineParam(
        'python-module',
        value='trainer.task'),
    region = dsl.PipelineParam(
        'region',
        value='us-central1'),
    runtime_version = dsl.PipelineParam(
        'runtime-version',
        value='1.10'),
    train_table = dsl.PipelineParam(
        'train-table',
        value='Energy.MLDataTrain'),
    valid_table = dsl.PipelineParam(
        'valid-table',
        value='Energy.MLDataValid'),
    test_table = dsl.PipelineParam(
        'test-table',
        value='Energy.MLDataTest'),
    train_file = dsl.PipelineParam(
        'train-file',
        value='gs://energyforecast/data/csv/MLDataTrain.csv'),
    valid_file = dsl.PipelineParam(
        'valid-file',
        value='gs://energyforecast/data/csv/MLDataValid.csv'),
    test_file = dsl.PipelineParam(
        'test-file',
        value='gs://energyforecast/data/csv/MLDataTest.csv')):
    
    CopTableOp = kfp.components.load_component('copy-table.component.yaml')
    ExpTableOp = kfp.components.load_component('export-table.component.yaml')
    DataPrepOp = kfp.components.load_component('run-git-python-script.component.yaml')
    TrainModelOp = kfp.components.load_component('train-git-cmle-model.component.yaml')
    
    
    ct_op = CopTableOp(
        dataset).apply(gcp.use_gcp_secret('user-gcp-sa'))
    dp_op = DataPrepOp(
        ct_op.output,
        code_repo,
        code_folder,
        data_prep_script,
        data_prep_args).apply(gcp.use_gcp_secret('user-gcp-sa'))
    tr_et_op = ExpTableOp(
        dp_op.output,
        train_table,
        train_file).apply(gcp.use_gcp_secret('user-gcp-sa'))
    va_et_op = ExpTableOp(
        dp_op.output,
        valid_table,
        valid_file).apply(gcp.use_gcp_secret('user-gcp-sa'))
    te_et_op = ExpTableOp(
        dp_op.output,
        test_table,
        test_file).apply(gcp.use_gcp_secret('user-gcp-sa'))
    tm_op = TrainModelOp(
        tr_et_op.output,
        va_et_op.output,
        code_repo,
        code_folder,
        project,
        bucket,
        package_folder,
        cmle_folder,
        scale_tier,
        python_module,
        region,
        runtime_version,
        cmle_args).apply(gcp.use_gcp_secret('user-gcp-sa'))
    
compiler.Compiler().compile(basic_bq_pipeline, 'energy-forecasting.tar.gz')

In [ ]: